/*
 * Copyright 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package com.google.zetasql;

import static com.google.zetasql.Parameter.normalize;
import static com.google.zetasql.Parameter.normalizeAndValidate;
import static com.google.zetasql.Parameter.serialize;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.errorprone.annotations.CheckReturnValue;
import com.google.zetasql.LocalService.EvaluateQueryRequest;
import com.google.zetasql.LocalService.EvaluateQueryResponse;
import com.google.zetasql.LocalService.PrepareQueryRequest;
import com.google.zetasql.LocalService.PrepareQueryResponse;
import com.google.zetasql.LocalService.PreparedQueryState;
import com.google.zetasql.LocalService.UnprepareQueryRequest;
import io.grpc.StatusRuntimeException;
import java.util.Map;

/**
 * ZetaSQL query evaluation using Service RPC.
 *
 * <p>A prepared query will create server side state, which should be released by calling close()
 * when the query is no longer used. Note that the close() method is called in finalize(), but you
 * should not rely on that because the Java garbage collector has no idea how much memory is used on
 * C++ side and GC may happen later than necessary.
 *
 * <p>Read the unit tests for examples.
 *
 * <p>This class is thread-safe.
 */
@CheckReturnValue
public final class PreparedQuery implements AutoCloseable {

  private boolean closed = false;
  private final long preparedQueryId;
  // expected are the fields we expect the user to provide to execute, but not all are required.
  private final ImmutableMap<String, Type> expectedParameters;
  // referenced is the subset of expected that the analyzer determined are actually required.
  private final ImmutableList<String> referencedParameters;
  private final ImmutableList<SimpleColumn> columns;
  // this is the list of types of the return columns and it's being used when deserializing the
  // values of the evaluate response
  private final ImmutableList<Type> columnsTypes;

  private PreparedQuery(
      long preparedQueryId,
      ImmutableMap<String, Type> expectedParameters,
      ImmutableList<String> referencedParameters,
      ImmutableList<SimpleColumn> columns,
      ImmutableList<Type> columnsTypes) {
    this.preparedQueryId = preparedQueryId;
    this.expectedParameters = expectedParameters;
    this.referencedParameters = referencedParameters;
    this.columns = columns;
    this.columnsTypes = columnsTypes;
  }

  /**
   * Get the list of parameters referenced in this query.
   *
   * <p>The parameters will be returned in lower case, as parameters are case-insensitive. The list
   * of parameters returned from this method are the minimal set that must be provided to execute().
   */
  public ImmutableList<String> getReferencedParameters() {
    return referencedParameters;
  }

  /**
   * Get the list of columns generated by this query.
   *
   * <p>The columns will be associated with an empty table name.
   */
  public ImmutableList<SimpleColumn> getColumns() {
    return columns;
  }

  /**
   * Evaluate the SQL query via Service RPC.
   *
   * @param parameters Map of parameter name:value pairs used in the SQL query.
   * @return The result in the form of a table content.
   */
  public TableContent execute(Map<String, Value> parameters) {
    Preconditions.checkNotNull(parameters);
    Preconditions.checkState(!closed);

    EvaluateQueryRequest request = buildEvaluateRequest(parameters);
    try {
      final EvaluateQueryResponse response = Client.getStub().evaluateQuery(request);
      return TableContent.deserialize(columnsTypes, response.getContent());
    } catch (StatusRuntimeException e) {
      throw new SqlException(e);
    }
  }

  private EvaluateQueryRequest buildEvaluateRequest(Map<String, Value> parameters) {
    EvaluateQueryRequest.Builder requestBuilder = EvaluateQueryRequest.newBuilder();
    requestBuilder.setPreparedQueryId(preparedQueryId);

    final ImmutableMap<String, Value> normalizedParameters =
        normalizeAndValidate(parameters, expectedParameters, "query");
    for (String param : referencedParameters) {
      Value value = normalizedParameters.get(param);
      if (value == null) {
        throw new SqlException("Incomplete query parameters " + param);
      }
      requestBuilder.addParams(serialize(param, value));
    }

    return requestBuilder.build();
  }

  /** Release the server side state for this prepared query. */
  @Override
  public void close() {
    if (closed) {
      return;
    }

    try {
      UnprepareQueryRequest request =
          UnprepareQueryRequest.newBuilder().setPreparedQueryId(preparedQueryId).build();
      Client.getStub().unprepareQuery(request);
    } catch (StatusRuntimeException e) {
      // ignore
    }

    closed = true;
  }

  @Override
  protected void finalize() throws Throwable {
    super.finalize();
    close();
  }

  /** Get a new builder. */
  public static Builder builder() {
    return new PreparedQuery.Builder();
  }

  /**
   * A builder for creating immutable PreparedQuery instances. Example:
   *
   * <pre>{@code
   * public final PreparedQuery queryWithoutCatalog =
   *     PreparedQuery.builder()
   *         .setSql(sqlString)
   *         .setAnalyzerOptions(options)
   *         .prepare();
   *
   * public final PreparedQuery queryWithCatalog =
   *     PreparedQuery.builder()
   *         .setSql(sqlString)
   *         .setAnalyzerOptions(options)
   *         .setCatalog(catalog)
   *         .prepare();
   *
   * public final PreparedQuery queryWithCatalogAndContent =
   *     PreparedQuery.builder()
   *         .setSql(sqlString)
   *         .setAnalyzerOptions(options)
   *         .setCatalog(catalog)
   *         .setTablesContents(tablesContents)
   *         .prepare();
   * }</pre>
   *
   * <p>Builder instances can be reused; it is safe to call {@link #prepare} multiple times to
   * create multiple instances of PreparedQuery. Every time {@link #prepare} is called a new query
   * is registered with the local service and a new instance is returned.
   */
  public static final class Builder {

    private final TypeFactory factory = TypeFactory.nonUniqueNames();
    private String sql;
    private AnalyzerOptions options;
    private SimpleCatalog catalog;
    private Map<String, TableContent> tablesContents;

    private Builder() {}

    /**
     * Set the string SQL query.
     *
     * <p>Mandatory.
     */
    @CanIgnoreReturnValue
    public Builder setSql(String sql) {
      this.sql = sql;
      return this;
    }

    /**
     * Set the options to customize the query's analyzer behavior.
     *
     * <p>Mandatory.
     */
    @CanIgnoreReturnValue
    public Builder setAnalyzerOptions(AnalyzerOptions options) {
      this.options = options;
      return this;
    }

    /**
     * Set the catalog with which this query interacts.
     *
     * <p>Optional.
     */
    @CanIgnoreReturnValue
    public Builder setCatalog(SimpleCatalog catalog) {
      this.catalog = catalog;
      return this;
    }

    /**
     * Set the content for the tables within the catalog.
     *
     * <p>Optional.
     *
     * <p>It can be used only together with a catalog and that catalog needs to be unregistered.
     */
    @CanIgnoreReturnValue
    public Builder setTablesContents(Map<String, TableContent> tablesContents) {
      this.tablesContents = tablesContents;
      return this;
    }

    /**
     * Prepare the query and return a new instance of PreparedQuery. Throwing SqlException if
     * there's an error (not necessarily network/server failure).
     *
     * <p>IllegalStateException is being thrown if the SQL string is null
     *
     * <p>IllegalStateException is being thrown if the options is null
     *
     * <p>IllegalStateException is being thrown if tables contents were provided without a catalog
     *
     * <p>IllegalStateException is being thrown if the provided catalog is a registered catalog and
     * tables contents were provided. At the moment changing the contents of the tables of a
     * registered catalog is not supported yet.
     */
    public PreparedQuery prepare() {
      Preconditions.checkState(sql != null, "SQL string must not be null");
      Preconditions.checkState(options != null, "AnalyzerOptions must not be null");
      Preconditions.checkState(
          tablesContents == null || (catalog != null && !catalog.isRegistered()),
          "Can only provide tables contents with a catalog that has not been registered");

      FileDescriptorSetsBuilder fileDescriptorSetsBuilder = new FileDescriptorSetsBuilder();
      final PrepareQueryRequest request = buildPrepareRequest(fileDescriptorSetsBuilder);
      try {
        final PrepareQueryResponse response = Client.getStub().prepareQuery(request);
        return processResponse(response.getPrepared(), fileDescriptorSetsBuilder);
      } catch (StatusRuntimeException e) {
        throw new SqlException(e);
      }
    }

    private PrepareQueryRequest buildPrepareRequest(
        FileDescriptorSetsBuilder fileDescriptorSetsBuilder) {
      fileDescriptorSetsBuilder.addAllFileDescriptors(BuiltinDescriptorPool.getInstance());

      PrepareQueryRequest.Builder builder = PrepareQueryRequest.newBuilder();
      builder.setSql(sql);
      builder.setOptions(options.serialize(fileDescriptorSetsBuilder));

      Map<DescriptorPool, Long> catalogRegisteredDescriptorPoolIds = ImmutableMap.of();
      if (catalog != null) {
        if (catalog.isRegistered()) {
          catalogRegisteredDescriptorPoolIds = catalog.getRegisteredDescriptorPoolIds();
          for (DescriptorPool pool : catalogRegisteredDescriptorPoolIds.keySet()) {
            fileDescriptorSetsBuilder.addAllFileDescriptors(pool);
          }
          builder.setRegisteredCatalogId(catalog.getRegisteredId());
        } else {
          builder.setSimpleCatalog(catalog.serialize(fileDescriptorSetsBuilder));
          if (tablesContents != null) {
            tablesContents.forEach((t, c) -> builder.putTableContent(t, c.serialize()));
          }
        }
      } else {
        // A catalog must be provided, so we request a default initialized catalog with all of
        // the builtin functions.
        builder
            .getSimpleCatalogBuilder()
            .getBuiltinFunctionOptionsBuilder()
            .setLanguageOptions(builder.getOptions().getLanguageOptions());
      }
      builder.setDescriptorPoolList(
          DescriptorPoolSerializer.createDescriptorPoolListWithRegisteredIds(
              fileDescriptorSetsBuilder, catalogRegisteredDescriptorPoolIds));

      return builder.build();
    }

    private PreparedQuery processResponse(
        PreparedQueryState preparedQueryState,
        FileDescriptorSetsBuilder fileDescriptorSetsBuilder) {
      long preparedQueryId = preparedQueryState.getPreparedQueryId();

      ImmutableMap<String, Type> expectedParameters = normalize(options.getQueryParameters());
      ImmutableList<String> referencedParameters =
          ImmutableList.copyOf(preparedQueryState.getReferencedParametersList());

      // NOTE: The deserialized columns will belong to a table with an empty name
      ImmutableList.Builder<SimpleColumn> columnsBuilder = ImmutableList.builder();
      ImmutableList.Builder<Type> columnsTypesBuilder = ImmutableList.builder();
      preparedQueryState
          .getColumnsList()
          .forEach(
              colProto -> {
                SimpleColumn column =
                    SimpleColumn.deserialize(
                        colProto, "", fileDescriptorSetsBuilder.getDescriptorPools(), factory);
                columnsBuilder.add(column);
                columnsTypesBuilder.add(column.getType());
              });

      return new PreparedQuery(
          preparedQueryId,
          expectedParameters,
          referencedParameters,
          columnsBuilder.build(),
          columnsTypesBuilder.build());
    }
  }
}
